{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark import SparkContext\n",
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc=SparkContext(appName=\"DataFrame_d1\") \n",
    "spark=SparkSession.builder.appName('d1').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "staff=[('mike',30,'finance',24000),('lee',34,'develop',36000),('allen',36,'manager',40000)] "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_staff=spark.createDataFrame(staff,['staff','age','dept','salary'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "#from pyspark.sql import Column"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "pyspark.sql.column.Column"
      ]
     },
     "execution_count": 26,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "type(df_staff.staff)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Column<salary>"
      ]
     },
     "execution_count": 27,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_staff.salary"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(name=u'mike', dept=u'finance'),\n",
       " Row(name=u'lee', dept=u'develop'),\n",
       " Row(name=u'allen', dept=u'manager')]"
      ]
     },
     "execution_count": 28,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_staff.select(df_staff.staff.alias('name'),df_staff.dept).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "salary_asc=df_staff.select(df_staff.staff,df_staff.salary).orderBy(df_staff.staff.asc())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "pyspark.sql.dataframe.DataFrame"
      ]
     },
     "execution_count": 35,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "type(salary_asc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(staff=u'allen', salary=40000),\n",
       " Row(staff=u'lee', salary=36000),\n",
       " Row(staff=u'mike', salary=24000)]"
      ]
     },
     "execution_count": 36,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "salary_asc.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [],
   "source": [
    "df=spark.createDataFrame([('Tom', 80), (None, 60), ('Alice', None)], [\"name\", \"height\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(name=None), Row(name=u'Alice'), Row(name=u'Tom')]"
      ]
     },
     "execution_count": 38,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.select(df.name).orderBy(df.name.asc_nulls_first()).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [],
   "source": [
    "d=df_staff.select(df_staff.staff,df_staff.age.cast('string'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(staff=u'mike', age=u'30'),\n",
       " Row(staff=u'lee', age=u'34'),\n",
       " Row(staff=u'allen', age=u'36')]"
      ]
     },
     "execution_count": 40,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "d.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(staff=u'mike', contains(staff, ee)=False),\n",
       " Row(staff=u'lee', contains(staff, ee)=True),\n",
       " Row(staff=u'allen', contains(staff, ee)=False)]"
      ]
     },
     "execution_count": 43,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_staff.select(df_staff.staff,df_staff.staff.contains('ee')).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+---------------+\n",
      "|staff|dept LIKE deve%|\n",
      "+-----+---------------+\n",
      "| mike|          false|\n",
      "|  lee|           true|\n",
      "|allen|          false|\n",
      "+-----+---------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_staff.select(df_staff.staff,df_staff.dept.like('deve%')).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(staff=u'mike', salary=24000, startswith(staff, m)=True),\n",
       " Row(staff=u'lee', salary=36000, startswith(staff, m)=False),\n",
       " Row(staff=u'allen', salary=40000, startswith(staff, m)=False)]"
      ]
     },
     "execution_count": 52,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_staff.select(df_staff.staff,df_staff.salary,df_staff.staff.startswith('m')).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(staff=u'mike', age=30, dept=u'finance', salary=24000),\n",
       " Row(staff=u'lee', age=34, dept=u'develop', salary=36000)]"
      ]
     },
     "execution_count": 54,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df_staff.filter(df_staff.staff.endswith('e')).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 60,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "row = Row( name='苹果',unit_price=20, amount=10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 61,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "pyspark.sql.types.Row"
      ]
     },
     "execution_count": 61,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "type(row)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 62,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Row(amount=10, name='\\xe8\\x8b\\xb9\\xe6\\x9e\\x9c', unit_price=20)"
      ]
     },
     "execution_count": 62,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "row"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 64,
   "metadata": {},
   "outputs": [],
   "source": [
    "row_dict=row.asDict()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 65,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "dict"
      ]
     },
     "execution_count": 65,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "type(row_dict)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 66,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'amount': 10, 'name': '\\xe8\\x8b\\xb9\\xe6\\x9e\\x9c', 'unit_price': 20}"
      ]
     },
     "execution_count": 66,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "row_dict"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 74,
   "metadata": {},
   "outputs": [],
   "source": [
    "df=spark.createDataFrame([Row(no=1, value='foo'), Row(no=2, value=None),Row(no=2, value='fun')])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 80,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(no=1, value=u'foo'), Row(no=2, value=None), Row(no=2, value=u'fun')]"
      ]
     },
     "execution_count": 80,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 72,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['no', 'value']"
      ]
     },
     "execution_count": 72,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.getItem('no')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 77,
   "metadata": {},
   "outputs": [],
   "source": [
    "row_1=df.collect()[0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 78,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "pyspark.sql.types.Row"
      ]
     },
     "execution_count": 78,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "type(row_1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 79,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 79,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "'no' in row_1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 81,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "False"
      ]
     },
     "execution_count": 81,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "'name' in row_1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 84,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import functions as F"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 96,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(no=1, value=u'foo', CASE WHEN (no > 1) THEN 1 ELSE 0 END=0),\n",
       " Row(no=2, value=None, CASE WHEN (no > 1) THEN 1 ELSE 0 END=1),\n",
       " Row(no=2, value=u'fun', CASE WHEN (no > 1) THEN 1 ELSE 0 END=1)]"
      ]
     },
     "execution_count": 96,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.select(df.no,df.value,F.when(df.no > 1,1).otherwise(0)).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 98,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'no': 1, 'value': u'foo'}"
      ]
     },
     "execution_count": 98,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "row_1.asDict()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 106,
   "metadata": {},
   "outputs": [],
   "source": [
    "data = spark.sparkContext.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 107,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import *"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 105,
   "metadata": {},
   "outputs": [],
   "source": [
    "schema = StructType([\n",
    "        StructField(\"id\", LongType(), True),    \n",
    "        StructField(\"name\", StringType(), True),\n",
    "        StructField(\"age\", LongType(), True),\n",
    "        StructField(\"eyeColor\", StringType(), True)\n",
    "    ])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 108,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.createDataFrame(data, schema)  # 创建 DataFrame，并指定schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 109,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "StructType(List(StructField(id,LongType,true),StructField(name,StringType,true),StructField(age,LongType,true),StructField(eyeColor,StringType,true)))"
      ]
     },
     "execution_count": 109,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 114,
   "metadata": {},
   "outputs": [],
   "source": [
    "d1=[('red', 'wood', [100, 200, 20])]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 115,
   "metadata": {},
   "outputs": [],
   "source": [
    "schema=StructType([\n",
    "             StructField('door_color', StringType()),\n",
    "             StructField('door_material', StringType()),\n",
    "             StructField('door_param', ArrayType(IntegerType()))])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 116,
   "metadata": {},
   "outputs": [],
   "source": [
    "df2 = spark.createDataFrame(d1, schema)  # 创建 DataFrame，并指定schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 117,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+-------------+--------------+\n",
      "|door_color|door_material|    door_param|\n",
      "+----------+-------------+--------------+\n",
      "|       red|         wood|[100, 200, 20]|\n",
      "+----------+-------------+--------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df2.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 119,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'fields': [{'metadata': {},\n",
       "   'name': 'door_color',\n",
       "   'nullable': True,\n",
       "   'type': 'string'},\n",
       "  {'metadata': {},\n",
       "   'name': 'door_material',\n",
       "   'nullable': True,\n",
       "   'type': 'string'},\n",
       "  {'metadata': {},\n",
       "   'name': 'door_param',\n",
       "   'nullable': True,\n",
       "   'type': {'containsNull': True, 'elementType': 'integer', 'type': 'array'}}],\n",
       " 'type': 'struct'}"
      ]
     },
     "execution_count": 119,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "schema.jsonValue()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 120,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 120,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "schema.needConversion()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 122,
   "metadata": {},
   "outputs": [],
   "source": [
    "d2=[('red', 'wood', {'door_width':100}, {'door_height':200}, {'door_thickness':20})]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 126,
   "metadata": {},
   "outputs": [],
   "source": [
    "schema=StructType([\n",
    "             StructField('door_color', StringType()),\n",
    "             StructField('door_material', StringType()),\n",
    "             StructField(\"width\", MapType(StringType(), IntegerType(), False), False),\n",
    "             StructField(\"height\", MapType(StringType(), IntegerType(), False), False),\n",
    "             StructField(\"thickness\", MapType(StringType(), IntegerType(), False), False)\n",
    "])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 127,
   "metadata": {},
   "outputs": [],
   "source": [
    "df3 = spark.createDataFrame(d2, schema)  # 创建 DataFrame，并指定schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 129,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(door_color=u'red', door_material=u'wood', width={u'door_width': 100}, height={u'door_height': 200}, thickness={u'door_thickness': 20})]"
      ]
     },
     "execution_count": 129,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df3.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 131,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'struct<door_color:string,door_material:string,width:map<string,int>,height:map<string,int>,thickness:map<string,int>>'"
      ]
     },
     "execution_count": 131,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "schema.simpleString()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 132,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- door_color: string (nullable = true)\n",
      " |-- door_material: string (nullable = true)\n",
      " |-- width: map (nullable = false)\n",
      " |    |-- key: string\n",
      " |    |-- value: integer (valueContainsNull = false)\n",
      " |-- height: map (nullable = false)\n",
      " |    |-- key: string\n",
      " |    |-- value: integer (valueContainsNull = false)\n",
      " |-- thickness: map (nullable = false)\n",
      " |    |-- key: string\n",
      " |    |-- value: integer (valueContainsNull = false)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df3.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 133,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "StorageLevel(False, False, False, False, 1)"
      ]
     },
     "execution_count": 133,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df3.storageLevel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 134,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['door_color', 'door_material', 'width', 'height', 'thickness']"
      ]
     },
     "execution_count": 134,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df3.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 135,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('door_color', 'string'),\n",
       " ('door_material', 'string'),\n",
       " ('width', 'map<string,int>'),\n",
       " ('height', 'map<string,int>'),\n",
       " ('thickness', 'map<string,int>')]"
      ]
     },
     "execution_count": 135,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df3.dtypes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 139,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(door_color=u'red', door_material=u'wood', width={u'door_width': 100}, height={u'door_height': 200}, thickness={u'door_thickness': 20})]"
      ]
     },
     "execution_count": 139,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df3.rdd.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 140,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "False"
      ]
     },
     "execution_count": 140,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df3.isStreaming"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.16"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
